跳到主要内容

RabbitMQ 常用的工作模式学习

参考资料 官方文档 RabbitMQ Tutorials 参考资料 RabbitMQ的六种工作模式

配置 Maven 依赖

这篇笔记主要使用 Java 演示 RabbitMQ 的六种工作模式,所以得先配置一下环境

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>

channel 信道

概念:信道是生产消费者与 rabbit 通信的渠道,生产者 publish 或是消费者 subscribe 一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?

就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用。

疑问:为什么不建立多个TCP连接呢?原因是rabbit保证性能,系统为每个线程开辟一个TCP是非常消耗性能, 每秒成百上千的建立销毁TCP会严重消耗系统。所以rabbitmq选择建立多个信道(建立在tcp的虚拟连接) 连接到rabbit上。

类似概念:TCP是电缆,信道就是里面的光纤,每个光纤都是独立的,互不影响。

确认机制(ack)

1、发送方确认模式:消息发送到交换器 > 发送完毕 > 消息投递到队列或持久化到磁盘异步回调通知生产者

2、消费者确认机制:消息投递消费者 > ack > 删除该条消息 > 投递下一条

注:收到ACK前,不会把消息再次发送给该消费者,但是会把下一条消息发送给其他消费者

simple 简单模式

注意:这种模式是点对点的,所以一个生产者对应一个消费者

最简单的一个消费者和一个生产者的模式,生产者生成消息,消费者监听消息,若是消费者监听到它所需要的消息,就会消费该消息,这种消息是一次性的,被消费了就没有了。

image.png

生产数据

 @Test
public void testSendMessage() throws IOException, TimeoutException {
// 创建一个 MQ 的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 创建连接 rabbitmq 的主机(注意!!这里有个坑,不能使用 127.0.0.1,应该使用 localhost 才行)
connectionFactory.setHost("localhost");
// 设置端口号(注意不是 15672)
connectionFactory.setPort(5672);
// 设置要连接的虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码(必须使用密码登陆,所以别使用无密码的账户)
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");

// 通过上面的连接工厂获取连接对象
Connection connection = connectionFactory.newConnection();
// 获取连接通道
Channel channel = connection.createChannel();
// 绑定对应的消息队列
// 参数1:队列名称(如果不存在则会自动创建)
// 参数2:设置是否持久化(true 持久化)
// 参数3:exclusive 设置是否独占
// 参数4:autoDelete 设置是否删除
// 参数5:附加参数
channel.queueDeclare("hello",false,false,false,null);


// 发布消息(注意:上面只是绑定数据到队列上)
// 参数一:交换机名称
// 参数二:发布的队列
// 参数三:属性设置(MessageProperties.PERSISTENT_TEXT_PLAIN 设置当前消息的持久化)
// 参数四:发布的具体内容(需要使用 Byte类型)
channel.basicPublish("","hello",null,"hello rabbitMQ".getBytes());

channel.close();
connectionFactory.clone();
}

注意:设置持久化时队列持久化不代表消息持久化,且生产者和消费者设置的参数必须是一样的,例如生产者设置为持久化,消费者也必须加上持久化这个属性

消费数据

注意:这个不能使用 Junit,因为当执行到回调函数之前单元测试的线程就结束了,所以应该放在 main 函数里面执行

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 绑定对应的消息队列
channel.queueDeclare("hello", false, false, false, null);

// 消费消息
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});

// 注意:因为那个回调函数是 “异步” 的,所以这里就不用关闭通道

抽象成工具类

因为每次都要重复的编写这个连接,所以将其抽象成工具类

public class RabbitMQConnection {
private RabbitMQConnection(){}

private static final ConnectionFactory connectionFactory;
static {
// 创建一个 MQ 的连接工厂对象
connectionFactory = new ConnectionFactory();
// 创建连接 rabbitmq 的主机
connectionFactory.setHost("localhost");
// 设置端口号
connectionFactory.setPort(5672);
// 设置要连接的虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码(因为没有设置密码,所以直接略过)
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
}

public static Connection getConnection() throws IOException, TimeoutException {
// 通过上面的连接工厂获取连接对象
return connectionFactory.newConnection();
}

// 关闭
public static void closeConnectionAndChannel(Channel channel,Connection connection) throws IOException, TimeoutException {
if (channel != null) {
channel.close();
}

if (connection != null) {
connection.close();
}
}
}

工作模型(Work Queue)

如图,这种模型一个队列对应多个消费者,队列中的消息一旦被消费就会消失,确保了任务不会被重复执行

image.png

注意:Rabbit 队列默认情况下是平均分配消息的,例如两个消费者,就对半分这些消息,所以就有一个问题,慢的和快的消费者消费速度不同,但是分配的消息却是相同的,造成了性能的浪费(速度快的消费完消息,而慢的还在消费)

为了解决这个问题:需要设置一次只能消费一条数据,且关闭消费者的自动确认机制(就是收到消息自动删除队列里的消息,而不去考虑是否真的消费完成)

// 设置为一次只能消费一条数据
channel.basicQos(1);

// 使用第二个参数关闭它就好了
channel.basicConsume("work",false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
// 手动确认消息已经消费
// 参数一:确认的消息标签(确认队列中某个具体的消息) 参数二:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});

生产者

public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);

for (int i = 0; i < 10; i++) {
channel.basicPublish("","work",null,("number" + i + "work Queue").getBytes());
}

RabbitMQConnection.closeConnectionAndChannel(channel,connection);
}
}

消费者01

public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
// 设置为一次只能消费一条数据
channel.basicQos(1);

// 使用第二个参数关闭它就好了
channel.basicConsume("work",false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
// 手动确认消息已经消费
// 参数一:确认的消息标签(确认队列中某个具体的消息) 参数二:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}

消费者02

public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicQos(1);
channel.basicConsume("work",false, new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 这里做个延迟
Thread.sleep(10);
System.out.println("消费者-2" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}

发布订阅模型

又称 “广播” 模型(fanout)

image.png

特点:

  • 可以多消费者
  • 每个消费者都有自己的队列
  • 每个队列都要绑定到交换机上
  • 交换机把消息发送给绑定过的所有队列(广播)

如下:生产者发送一条消息,下面绑定了该交换机的消费者都能收到这条消息

生产者

public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 指定交换机
// 参数一:交换机名称 参数二:交换机类型(这里的 fanout 是广播)
channel.exchangeDeclare("test","fanout");
// 发送消息到交换机
channel.basicPublish("test","",null,"test fanout exchange message!".getBytes());
RabbitMQConnection.closeConnectionAndChannel(channel,connection);
}
}

消费者01

public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("test","fanout");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列
channel.queueBind(queueName,"test","");

// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
}
});
}
}

消费者02

public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("test","fanout");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列
channel.queueBind(queueName,"test","");

// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2" + new String(body));
}
});
}
}

路由模型-直连(Direct)

image.png

在 Fanout 模型中,一条消息会被所有订阅的队列消费。如果希望不同的消息被不同的队列消费就要使用到 Direct 类型的 Exchange

在这种直连模型下:

  • 队列与交换机绑定需要加上一个 RoutingKey
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据 RoutingKey 来判断

生产者

public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
// 发送消息
String routingKey = "info";
channel.basicPublish("logs_direct",routingKey,null,("test direct model. this key:[" + routingKey + "]").getBytes());
RabbitMQConnection.closeConnectionAndChannel(channel,connection);
}
}

消费者01

public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("logs_direct","direct");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列(绑定多种类型的 routing key)
channel.queueBind(queueName,"logs_direct","error");
channel.queueBind(queueName,"logs_direct","info");
channel.queueBind(queueName,"logs_direct","warning");
// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
}
});
}
}

注意:下面只绑定了一个 routingKey

消费者02

public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("logs_direct","direct");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列
channel.queueBind(queueName,"logs_direct","info");
// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2" + new String(body));
}
});
}
}

路由模型-直连(Topics)

image.png 动态路由模型,就是上面那种 Direct 模型的改进版,使之支持统配符了

因为 Routing Key 一般都是由一个或多个词组成,多个词之间使用 . 进行分隔,所以下面的通配符就是基于这个进行匹配

  • * (star) 代替一个词
  • # (hash) 代替多个词

生产者

public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_topic","topic");
// 发送消息
String routingKey = "info";
channel.basicPublish("logs_topic",routingKey,null,("test topic model. this key:[" + routingKey + "]").getBytes());
RabbitMQConnection.closeConnectionAndChannel(channel,connection);
}
}

消费者

public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();

// 绑定交换机
channel.exchangeDeclare("logs_topic","topic");
// 创建一个临时的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和临时队列(绑定多种类型的 routing key)
channel.queueBind(queueName,"logs_topic","*");
// 消费消息
channel.basicConsume(queueName,true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1" + new String(body));
}
});
}
}